﻿using iTool.Common.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public class SQLServerQueueStorageProvider : IQueueStorageProvider
    {
        private Subject<BatchItem<MessageData>> _subject;

        public string QueueId { get; set; }
        private string QueueTableName { get; set; }

        private AdoNetOptions options;
        string connection;

        private void Ini(AdoNetOptions options, string queueId)
        {
            // Set Connection option
            {
                this.options = options;
                this.connection = this.options.GetConnection(this.options.QueueDatabaseName);
            }

            // 批处理器
            {
                _subject = new Subject<BatchItem<MessageData>>();
                _subject.Buffer(TimeSpan.FromMilliseconds(10), 100) // 100毫秒 或者 100 条
                    .Where(x => x.Count > 0)
                    .Select(list => Observable.FromAsync(() => BatchWriteAsync(list)))
                    .Concat()
                    .Subscribe();
            }

            // 检查存储表创建
            {
                // 创建实例
                this.QueueId = queueId;
                this.QueueTableName = $"ClusterQueueTable_{this.QueueId}";
                // 创建表脚本
                string createTable = $@"CREATE TABLE {this.QueueTableName}
							(
								offset int primary key not null,
								payload nvarchar(max)
							);";

                using (SqlConnection conn = new SqlConnection(this.connection))
                {
                    List<MessageData> messages = new List<MessageData>();
                    using (SqlCommand command = new SqlCommand($"select 1 from sysobjects where id=object_Id('{this.QueueTableName}')", conn))
                    {
                        conn.Open();
                        var result = command.ExecuteScalar();

                        if (result == null)
                        {
                            // 不存在创建表
                            command.CommandText = createTable;
                            command.ExecuteNonQuery();
                        }
                    }
                }

                
            }
        }

        public IQueueStorageProvider CreateInstance(AdoNetOptions options, string queueId)
        {
            queueId = queueId.Replace("-", "_");
            var queueStorageProvider = new SQLServerQueueStorageProvider();
            queueStorageProvider.Ini(options, queueId);
            return queueStorageProvider;
        }

        public async Task<IEnumerable<MessageData>> DequeueAsync(long startOffset, int maxCount)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                List<MessageData> messages = new List<MessageData>();
                await using (SqlCommand command = new SqlCommand($"select top {maxCount} payload from {this.QueueTableName} where offset >= @startOffset", conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                            new SqlParameter("startOffset",startOffset)
                    });
                    await conn.OpenAsync();
                    var reader = await command.ExecuteReaderAsync();

                    if (reader != null)
                    {
                        while (await reader.ReadAsync())
                        {
                            var message = reader["payload"].ToString();
                            if (!string.IsNullOrWhiteSpace(message))
                            {
                                messages.Add(iUtils.DeserializeObject<MessageData>(message));
                            }
                        }
                    }
                }

                return messages;
            }

        }

        public async Task EnqueueAsync(MessageData payload)
        {
            // 批量写入
            TaskCompletionSource<Task> source = new TaskCompletionSource<Task>();
            _subject.OnNext(new BatchItem<MessageData>
            {
                Body = payload,
                TaskSource = source
            });
            await source.Task;
        }

        public Task ShutdownAsync()
        {
            return Task.CompletedTask;
        }

        #region Private

        private async Task BatchWriteAsync(IList<BatchItem<MessageData>> messages)
        {
            List<string> sqls = messages
                .Select(item => $"({item.Body.SequenceNumber},'{JsonConvert.SerializeObject(item.Body)}')").ToList();


            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                await conn.OpenAsync();
                var transaction = conn.BeginTransaction(System.Data.IsolationLevel.ReadUncommitted);
                try
                {
                    await using (SqlCommand command = new SqlCommand($"insert into {this.QueueTableName}(offset,payload) values" + string.Join(',', sqls), conn, transaction))
                    {
                        await command.ExecuteNonQueryAsync();
                    }

                    await transaction.CommitAsync();

                    foreach (var item in messages)
                    {
                        item.TaskSource.TrySetResult(Task.CompletedTask);
                    }
                }
                catch (Exception ex)
                {
                    await transaction.RollbackAsync();
                    foreach (var item in messages)
                    {
                        item.TaskSource.TrySetException(ex);
                    }
                }
            }

        }

        #endregion
    }
}
